/* Copyright (c) 2016-2020 Stanford University
 *
 * Permission to use, copy, modify, and distribute this software for any
 * purpose with or without fee is hereby granted, provided that the above
 * copyright notice and this permission notice appear in all copies.
 *
 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR(S) DISCLAIM ALL WARRANTIES
 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL AUTHORS BE LIABLE FOR
 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
 */

#ifndef RUNTIME_NANOLOG_H
#define RUNTIME_NANOLOG_H

#include <aio.h>
#include <cassert>

#include <condition_variable>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

#include "Config.h"
#include "Common.h"
#include "Fence.h"
#include "Log.h"
#include "NanoLog.h"
#include "Util.h"

namespace NanoLogInternal {
using namespace NanoLog;

/**
 * RuntimeLogger provides runtime support to the C++ code generated by the
 * Preprocessor component.
 * Its main responsibilities are to manage fast thread-local storage to stage
 * uncompressed log messages and manage a background thread to compress the
 * log messages to an output file.
 */
    class RuntimeLogger {
    public:

        /**
         * See function below.
         */
        inline void
        registerInvocationSite_internal(int &logId, StaticLogInfo info) {
            // TODO(syang0) Make this into a spin lock
            std::lock_guard<std::mutex>
                                    lock(nanoLogSingleton.registrationMutex);

            if (logId != UNASSIGNED_LOGID)
                return;

            logId = static_cast<int32_t>(invocationSites.size());
            invocationSites.push_back(info);

#ifdef ENABLE_DEBUG_PRINTING
            printf("Registered '%s' as id=%d\r\n", info.formatString, logId);
            printf("\tisParamString [%p] = ", info.isArgString);
            for (int i = 0; i < info.numParams; ++i)
                printf("%d ", info.isArgString[i]);
            printf("\r\n");
#endif
        }

        /**
         * Assigns a globally unique identifier to static log information and
         * stages it for persistence to disk.
         *
         * \param info
         *      Static log info to associate and persist
         *
         * \param[in/out] logId
         *       Unique log identifier to be assigned. A value other than -1
         *       indicates that the id has already been assigned and this
         *       function becomes a no-op.
         */
        static inline void
        registerInvocationSite(StaticLogInfo info, int &logId) {
            nanoLogSingleton.registerInvocationSite_internal(logId, info);
        }

        /**
         * Allocate thread-local space for the generated C++ code to store an
         * uncompressed log message, but do not make it available for compression
         * yet. The caller should invoke finishAlloc() to make the space visible
         * to the compression thread and this function shall not be invoked
         * again until the corresponding finishAlloc() is invoked first.
         *
         * Note this will block of the buffer is full.
         *
         * \param nbytes
         *      number of bytes to allocate in the
         *
         * \return
         *      pointer to the allocated space
         */
        static inline char *
        reserveAlloc(size_t nbytes) {
            if (stagingBuffer == nullptr)
                nanoLogSingleton.ensureStagingBufferAllocated();

            // NOLINTNEXTLINE(clang-analyzer-core.CallAndMessage)
            return stagingBuffer->reserveProducerSpace(nbytes);
        }

        /**
         * Complement to reserveAlloc, makes the bytes previously
         * reserveAlloc()-ed visible to the compression/output thread.
         *
         * \param nbytes
         *      Number of bytes to make visible
         */
        static inline void
        finishAlloc(size_t nbytes) {
            stagingBuffer->finishReservation(nbytes);
        }

        static std::string getStats();
        static std::string getHistograms();
        static void preallocate();
        static void setLogFile(const char *filename);
        static void setLogLevel(LogLevel logLevel);
        static void sync();

        static inline LogLevel getLogLevel() {
            return nanoLogSingleton.currentLogLevel;
        }

        static inline int getCoreIdOfBackgroundThread() {
            return nanoLogSingleton.coreId;
        }
    PRIVATE:

        // Forward Declarations
        class StagingBuffer;
        class StagingBufferDestroyer;

        // Storage for staging uncompressed log statements for compression
        static __thread StagingBuffer *stagingBuffer;

        // Destroys the __thread StagingBuffer upon its own destruction, which
        // is synchronized with thread death
        static thread_local StagingBufferDestroyer sbc;

        // Singleton RuntimeLogger that manages the thread-local structures and
        // background output thread.
        static RuntimeLogger nanoLogSingleton;

        RuntimeLogger();

        ~RuntimeLogger();

        void compressionThreadMain();

        void setLogFile_internal(const char *filename);

        void waitForAIO();

        /**
         * Allocates thread-local structures if they weren't already allocated.
         * This is used by the generated C++ code to ensure it has space to
         * log uncompressed messages to and by the user if they wish to
         * preallocate the data structures on thread creation.
         */
        inline void
        ensureStagingBufferAllocated() {
            if (stagingBuffer == nullptr) {
                std::unique_lock<std::mutex> guard(bufferMutex);
                uint32_t bufferId = nextBufferId++;

                // Unlocked for the expensive StagingBuffer allocation
                guard.unlock();
                stagingBuffer = new StagingBuffer(bufferId);
                guard.lock();

                threadBuffers.push_back(stagingBuffer);
            }
        }

        // Globally the thread-local stagingBuffers
        std::vector<StagingBuffer *> threadBuffers;

        // Stores the id for the next StagingBuffer to be allocated. The ids are
        // unique for this execution for each StagingBuffer allocation.
        uint32_t nextBufferId = 1;

        // Protects reads and writes to threadBuffers
        std::mutex bufferMutex;

        // Background thread that polls the various staging buffers, compresses
        // the staged log messages, and outputs it to a file.
        std::thread compressionThread;

        // Indicates there's an operation in aioCb that should be waited on
        bool hasOutstandingOperation;

        // Flag signaling the compressionThread to stop running. This is
        // typically only set in testing or when the application is exiting.
        bool compressionThreadShouldExit;

        // Marks the progress of flushing all log messages to disk after a user
        // invokes the sync() API. To complete the operation, the background
        // thread has to make two passes through the staging buffers and wait
        // on the AIO to complete before waking up the user thread.
        enum {
            SYNC_REQUESTED,         // User invoked a sync() operation
            PERFORMING_SECOND_PASS, // Background thread is making a second pass
            WAITING_ON_AIO,         // Background thread is waiting on AIO
            SYNC_COMPLETED          // Operation complete/no requests
        } syncStatus;

        // Protects the condition variables below
        std::mutex condMutex;

        // Signal for when the compression thread should wakeup
        std::condition_variable workAdded;

        // Signaled when the background thread completes a sync() operation and
        // the user thread should wake up.
        std::condition_variable hintSyncCompleted;

        // File handle for the output file; should only be opened once at the
        // construction of the LogCompressor
        int outputFd;

        // POSIX AIO structure used to communicate async IO requests
        struct aiocb aioCb;

        // Used to stage the compressed log messages before passing it on to the
        // POSIX AIO library.

        // Dynamically allocated buffer to stage compressed log message before
        // handing it over to the POSIX AIO library for output.
        char *compressingBuffer;

        // Dynamically allocated double buffer that is swapped with the
        // compressingBuffer when the latter is passed to the POSIX AIO library.
        char *outputDoubleBuffer;

        // Minimum log level that RuntimeLogger will accept. Anything lower will
        // be dropped.
        LogLevel currentLogLevel;

        // Marks the rdtsc() when the current compression thread first started
        // running. A value of 0 indicates the compression thread is not running
        uint64_t cycleAtThreadStart;

        // Marks the rdtsc() when the last I/O operation started
        uint64_t cyclesAtLastAIOStart;

        // Metric: Number of cycles compression thread is doing work
        uint64_t cyclesActive;

        // Metric: Amount of time spent compressing the dynamic log data
        uint64_t cyclesCompressing;

        // Metric: Stores the distribution of StagingBuffer peek sizes in 5%
        // increments relative to the full size. This distribution should show
        // how well the background thread keeps up with the logging threads.
        uint64_t stagingBufferPeekDist[20];

        // Metric: Amount of time spent scanning the buffers for work and
        // compressing events found.
        uint64_t cyclesScanningAndCompressing;

        // Metric: Upper bound on the amount of time spent on fsync() and disk
        // writes. It is an upper bound since the code polls for the async IO
        uint64_t cyclesDiskIO_upperBound;

        // Metric: Number of bytes read in from the staging buffers
        uint64_t totalBytesRead;

        // Metric: Number of bytes written to the output file (includes padding)
        uint64_t totalBytesWritten;

        // Metric: Number of pad bytes written to round the file to the nearest
        // 512B
        uint64_t padBytesWritten;

        // Metric: Number of log statements compressed and outputted.
        uint64_t logsProcessed;

        // Metric: Number of times an AIO write was completed.
        uint32_t numAioWritesCompleted;

        // Stores the last coreId that the background thread ran in.
        int coreId;

        // Used to control access to invocationSites
        std::mutex registrationMutex;

        // Maps unique identifiers to log invocation sites encountered thus far
        // by the non-preprocessor version of NanoLog
        std::vector<StaticLogInfo> invocationSites;

        // Indicates the index of the next invocationSite that needs to be
        // persisted to disk.
        uint32_t nextInvocationIndexToBePersisted;

        /**
         * Implements a circular FIFO producer/consumer byte queue that is used
         * to hold the dynamic information of a NanoLog log statement (producer)
         * as it waits for compression via the NanoLog background thread
         * (consumer). There exists a StagingBuffer for every thread that uses
         * the NanoLog system.
         */
        class StagingBuffer {
        public:
            /**
             * Attempt to reserve contiguous space for the producer without
             * making it visible to the consumer. The caller should invoke
             * finishReservation() before invoking reserveProducerSpace()
             * again to make the bytes reserved visible to the consumer.
             *
             * This mechanism is in place to allow the producer to initialize
             * the contents of the reservation before exposing it to the
             * consumer. This function will block behind the consumer if
             * there's not enough space.
             *
             * \param nbytes
             *      Number of bytes to allocate
             *
             * \return
             *      Pointer to at least nbytes of contiguous space
             */
            inline char *
            reserveProducerSpace(size_t nbytes) {
                ++numAllocations;

                // Fast in-line path
                if (nbytes < minFreeSpace)
                    return producerPos;

                // Slow allocation
                return reserveSpaceInternal(nbytes);
            }

            /**
             * Complement to reserveProducerSpace that makes nbytes starting
             * from the return of reserveProducerSpace visible to the consumer.
             *
             * \param nbytes
             *      Number of bytes to expose to the consumer
             */
            inline void
            finishReservation(size_t nbytes) {
                assert(nbytes < minFreeSpace);
                assert(producerPos + nbytes <
                       storage + NanoLogConfig::STAGING_BUFFER_SIZE);

                Fence::sfence(); // Ensures producer finishes writes before bump
                minFreeSpace -= nbytes;
                producerPos += nbytes;
            }

            char *peek(uint64_t *bytesAvailable);

            /**
             * Consumes the next nbytes in the StagingBuffer and frees it back
             * for the producer to reuse. nbytes must be less than what is
             * returned by peek().
             *
             * \param nbytes
             *      Number of bytes to return back to the producer
             */
            inline void
            consume(uint64_t nbytes) {
                Fence::lfence(); // Make sure consumer reads finish before bump
                consumerPos += nbytes;
            }

            /**
             * Returns true if it's safe for the compression thread to delete
             * the StagingBuffer and remove it from the global vector.
             *
             * \return
             *      true if its safe to delete the StagingBuffer
             */
            bool
            checkCanDelete() {
                return shouldDeallocate && consumerPos == producerPos;
            }


            uint32_t getId() {
                return id;
            }

            StagingBuffer(uint32_t bufferId)
                    : producerPos(storage)
                    , endOfRecordedSpace(storage
                                           + NanoLogConfig::STAGING_BUFFER_SIZE)
                    , minFreeSpace(NanoLogConfig::STAGING_BUFFER_SIZE)
                    , cyclesProducerBlocked(0)
                    , numTimesProducerBlocked(0)
                    , numAllocations(0)
                    , cyclesProducerBlockedDist()
                    , cyclesIn10Ns(PerfUtils::Cycles::fromNanoseconds(10))
                    , cacheLineSpacer()
                    , consumerPos(storage)
                    , shouldDeallocate(false)
                    , id(bufferId)
                    , storage() {
                // Empty function, but causes the C++ runtime to instantiate the
                // sbc thread_local (see documentation in function).
                sbc.stagingBufferCreated();

                for (size_t i = 0; i < Util::arraySize(
                                              cyclesProducerBlockedDist); ++i)
                {
                    cyclesProducerBlockedDist[i] = 0;
                }
            }

            ~StagingBuffer() {
            }

        PRIVATE:

            char *reserveSpaceInternal(size_t nbytes, bool blocking = true);

            // Position within storage[] where the producer may place new data
            char *producerPos;

            // Marks the end of valid data for the consumer. Set by the producer
            // on a roll-over
            char *endOfRecordedSpace;

            // Lower bound on the number of bytes the producer can allocate w/o
            // rolling over the producerPos or stalling behind the consumer
            uint64_t minFreeSpace;

            // Number of cycles producer was blocked while waiting for space to
            // free up in the StagingBuffer for an allocation.
            uint64_t cyclesProducerBlocked;

            // Number of times the producer was blocked while waiting for space
            // to free up in the StagingBuffer for an allocation
            uint32_t numTimesProducerBlocked;

            // Number of alloc()'s performed
            uint64_t numAllocations;

            // Distribution of the number of times Producer was blocked
            // allocating space in 10ns increments. The last slot includes
            // all times greater than the last increment.
            uint32_t cyclesProducerBlockedDist[20];

            // Number of Cycles in 10ns. This is used to avoid the expensive
            // Cycles::toNanoseconds() call to calculate the bucket in the
            // cyclesProducerBlockedDist distribution.
            uint64_t cyclesIn10Ns;

            // An extra cache-line to separate the variables that are primarily
            // updated/read by the producer (above) from the ones by the
            // consumer(below)
            char cacheLineSpacer[2*Util::BYTES_PER_CACHE_LINE];

            // Position within the storage buffer where the consumer will consume
            // the next bytes from. This value is only updated by the consumer.
            char* volatile consumerPos;

            // Indicates that the thread owning this StagingBuffer has been
            // destructed (i.e. no more messages will be logged to it) and thus
            // should be cleaned up once the buffer has been emptied by the
            // compression thread.
            bool shouldDeallocate;

            // Uniquely identifies this StagingBuffer for this execution. It's
            // similar to ThreadId, but is only assigned to threads that NANO_LOG).
            uint32_t id;

            // Backing store used to implement the circular queue
            char storage[NanoLogConfig::STAGING_BUFFER_SIZE];

            friend RuntimeLogger;
            friend StagingBufferDestroyer;

            DISALLOW_COPY_AND_ASSIGN(StagingBuffer);
        };

        // This class is intended to be instantiated as a C++ thread_local to
        // synchronize marking the thread local stagingBuffer for deletion with
        // thread death.
        //
        // The reason why this class exists rather than wrapping the stagingBuffer
        // in a unique_ptr or declaring the stagingBuffer itself to be thread_local
        // is because of performance. Dereferencing the former costs 10 ns and the
        // latter allocates large amounts of resources for every thread that is
        // created, which is wasteful for threads that do not use the RuntimeLogger.
        class StagingBufferDestroyer {
        public:
            // TODO(syang0) I wonder if it'll be better if stagingBuffer was
            // actually a thread_local wrapper with dereference operators
            // implemented.

            explicit StagingBufferDestroyer() {
            }

            // Weird C++ hack; C++ thread_local are instantiated upon first use
            // thus the StagingBuffer has to invoke this function in order
            // to instantiate this object.
            void stagingBufferCreated() {}

            virtual ~StagingBufferDestroyer() {
                if (stagingBuffer != nullptr) {
                    stagingBuffer->shouldDeallocate = true;
                    stagingBuffer = nullptr;
                }
            }
        };

        DISALLOW_COPY_AND_ASSIGN(RuntimeLogger);
    };  // RuntimeLogger
}; // Namespace NanoLogInternal

// MUST appear at the very end of the RuntimeLogger.h file, right before the
// last #endif. It serves a marker for the preprocessor for where it can
// start injecting inlined, generated functions.
static const int __internal_dummy_variable_marker_for_code_injection = 0;

#endif /* RUNTIME_NANOLOG_H */

